package com.niit.streaming

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

object Spark_Stream_Close {

  def main(args: Array[String]): Unit = {

//    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark")
//    val ssc = new StreamingContext(sparkConf,Seconds(3))
    val ssc = StreamingContext.getActiveOrCreate("BD2",()=>{
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark")
        val ssc = new StreamingContext(sparkConf,Seconds(3))
        val lines = ssc.socketTextStream("localhost",9999)
        val wordOne =  lines.map((_,1))
        wordOne.print()

      ssc
})

    ssc.sparkContext.setLogLevel("ERROR")

    ssc.checkpoint("BD2")

    val lines = ssc.socketTextStream("localhost",9999)

    val wordOne =  lines.map((_,1))
    wordOne.print()
    ssc.start()
    new Thread(
      new Runnable {
        override def run(): Unit = {
          //关闭SparkStreaming应有一个外部开关
          /*
              HDFS--有了关闭的文件，就会出发关闭SparkStreaming语句
              MySQL--某张数据表中，根基某个字段进行查询，如果该字段结果返回是 off就关闭 返回on就不关闭
           */
          var fs = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root")
          while (true){

            Thread.sleep(2000)//每隔2秒去检查关闭的条件的是否存在

            //读取外部开关文件
            val bool:Boolean =  fs.exists(new Path("hdfs://node1:8020/stopSpark"))
            //获取当前SparkStreaming的状态
            val state = ssc.getState()
            if(bool){ //当外部开关文件存在

              if(state == StreamingContextState.ACTIVE){//当SparkStreaming为激活状态，就可以执行关闭了

                //第一个参数：是否关闭SparkContext
                //第二个参数：是否优雅的关闭
                ssc.stop(true,true)
                System.exit(0)//退出当前程序
              }
            }

          }

        }
      }
    ).start()

    ssc.awaitTermination()

  }

}
